package com.ruyuan.little.project.elasticsearch.biz.common.dao.impl;

import com.alibaba.fastjson.JSON;
import com.ruyuan.little.project.common.dto.CommonResponse;
import com.ruyuan.little.project.elasticsearch.biz.common.constant.StringPoolConstant;
import com.ruyuan.little.project.elasticsearch.biz.common.dao.ElasticsearchDao;
import org.apache.commons.io.IOUtils;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.IdsQueryBuilder;
import org.elasticsearch.index.reindex.ReindexRequest;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.io.Resource;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

import java.io.IOException;

/**
 * @author <a href="mailto:little@163.com">little</a>
 * version: 1.0
 * Description:elasticsearch实战
 **/
@Component
public class ElasticsearchDaoImpl implements ElasticsearchDao {

    /**
     * 日志组件
     */
    private final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchDaoImpl.class);

    /**
     * es rest client 客户端
     */
    @Autowired
    private RestHighLevelClient restClient;

    @Override
    public Boolean docExistFlag(String indexName, String id) throws IOException {
        GetRequest getRequest = new GetRequest(indexName,id);
        return restClient.exists(getRequest, RequestOptions.DEFAULT);
    }

    /**
     * 新增数据
     *
     * @param indexRequest 新增请求
     * @return 结果
     */
    @Override
    public CommonResponse insert(IndexRequest indexRequest) {
        try {
            IndexResponse indexResponse = restClient.index(indexRequest, RequestOptions.DEFAULT);
            LOGGER.info("返回状态{}", indexResponse.status());
            if (indexResponse.status() == RestStatus.CREATED) {
                return CommonResponse.success(indexResponse.getId());
            }
        } catch (Exception e) {
            LOGGER.error("新增失败", e);
        }
        return CommonResponse.fail();
    }

    /**
     * 修改数据
     *
     * @param updateRequest 修改请求
     * @return 结果
     */
    @Override
    public CommonResponse update(UpdateRequest updateRequest) {
        try {
            UpdateResponse updateResponse = restClient.update(updateRequest, RequestOptions.DEFAULT);
            LOGGER.info("返回状态{}", updateResponse.status());
            if (updateResponse.status() == RestStatus.OK) {
                return CommonResponse.success();
            }
        } catch (Exception e) {
            LOGGER.error("修改失败", e);
        }
        return CommonResponse.fail();
    }

    /**
     * 删除数据
     *
     * @param deleteRequest 删除请求
     * @return 结果
     */
    @Override
    public CommonResponse delete(DeleteRequest deleteRequest) {
        try {
            DeleteResponse deleteResponse = restClient.delete(deleteRequest, RequestOptions.DEFAULT);
            if (deleteResponse.status() == RestStatus.OK) {
                return CommonResponse.success();
            }
        } catch (Exception e) {
            LOGGER.error("删除失败", e);
        }
        return CommonResponse.fail();
    }

    /**
     * 批量操作数据
     *
     * @param bulkRequest 批量请求
     * @return 结果
     */
    @Override
    public CommonResponse bulk(BulkRequest bulkRequest) {
        try {
            BulkResponse bulkResponse = restClient.bulk(bulkRequest, RequestOptions.DEFAULT);
            LOGGER.info("返回状态{}", bulkResponse.status());
            if (bulkResponse.status() == RestStatus.OK) {
                return CommonResponse.success();
            }
        } catch (IOException e) {
            LOGGER.error("批量操作失败", e);
        }
        return CommonResponse.fail();
    }

    /**
     * 根据id查询结果
     *
     * @param index 待查询的index
     * @param id    待查询的id
     * @return 结果
     */
    @Override
    public <T> T getById(String index, String id, Class<T> clazz){
        SearchRequest searchRequest = new SearchRequest(index);
        searchRequest.source(new SearchSourceBuilder().query(new IdsQueryBuilder().addIds(id)));
        try {
            LOGGER.info("ES请求参数：{}", searchRequest.source().toString());
            SearchResponse searchResponse = restClient.search(searchRequest, RequestOptions.DEFAULT);
            SearchHit[] searchHits = searchResponse.getHits().getHits();
            if (searchHits != null && searchHits.length == 1) {
                return JSON.parseObject(searchHits[0].getSourceAsString(), clazz);
            }
        } catch (IOException e) {
            LOGGER.error("根据id查询详情失败", e);
        }
        return null;
    }

    @Override
    public Boolean indexExistFlag(String indexName) throws IOException {
        GetIndexRequest getIndexRequest = new GetIndexRequest(indexName);
        return restClient.indices().exists(getIndexRequest, RequestOptions.DEFAULT);
    }


    @Override
    public Boolean createIndex(String indexName, String aliasName, Resource mappingResource) throws IOException {
        // 创建索引请求
        CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName);
        // 索引别名不为空，添加索引别名
        if (StringUtils.hasLength(aliasName)){
            createIndexRequest.alias(new Alias(aliasName));
        }
        // 索引mapping
        String indexMappingJson = IOUtils.toString(mappingResource.getInputStream());
        createIndexRequest.mapping(indexMappingJson, XContentType.JSON);

        CreateIndexResponse createIndexResponse = restClient.indices().create(createIndexRequest, RequestOptions.DEFAULT);
        LOGGER.info("create index :{} alias :{}  response:{}", indexName, aliasName,createIndexResponse.isAcknowledged());
        return createIndexResponse.isAcknowledged();
    }

    @Override
    public void reindex(String oldIndexName, String newIndexName) throws IOException {
        ReindexRequest request = new ReindexRequest();
        // 源index
        request.setSourceIndices(oldIndexName);
        // 目标index
        request.setDestIndex(newIndexName);
        // scroll单次处理数据大小
        request.setSourceBatchSize(1000);
        // 可选值 create 和 index，默认index表示存在就修改，create表示不存在时才创建
        request.setDestOpType(StringPoolConstant.CREATE);
        // 可选值 proceed 和 abort，proceed冲突时继续，abort冲突时终止
        request.setConflicts(StringPoolConstant.PROCEED);
        // 请求结束后是否调用refresh
        request.setRefresh(true);
        restClient.reindex(request, RequestOptions.DEFAULT);
        LOGGER.info("reindex finished, oldIndex: {} newIndex: {}", oldIndexName, newIndexName);
    }

    @Override
    public Boolean addAlias(String indexName, String aliasName) throws IOException {
        IndicesAliasesRequest aliasesRequest = new IndicesAliasesRequest();
        IndicesAliasesRequest.AliasActions aliasAction =
                new IndicesAliasesRequest.AliasActions(IndicesAliasesRequest.AliasActions.Type.ADD)
                        .index(indexName)
                        .alias(aliasName);
        aliasesRequest.addAliasAction(aliasAction);
        AcknowledgedResponse acknowledgedResponse = restClient.indices().updateAliases(aliasesRequest,RequestOptions.DEFAULT);
        LOGGER.info("add index alias , index: {} alias: {} response: {}", indexName, aliasName, acknowledgedResponse.isAcknowledged());
        return acknowledgedResponse.isAcknowledged();
    }

    @Override
    public Boolean changeAliasAfterReindex(String aliasName, String oldIndexName, String newIndexName) throws IOException {
        // 添加索引别名
        IndicesAliasesRequest.AliasActions addIndexAction = new IndicesAliasesRequest.AliasActions(
                IndicesAliasesRequest.AliasActions.Type.ADD).index(newIndexName).alias(aliasName);
        // 移除索引别名
        IndicesAliasesRequest.AliasActions removeAction = new IndicesAliasesRequest.AliasActions(
                IndicesAliasesRequest.AliasActions.Type.REMOVE).index(oldIndexName).alias(aliasName);

        IndicesAliasesRequest indicesAliasesRequest = new IndicesAliasesRequest();
        indicesAliasesRequest.addAliasAction(addIndexAction);
        indicesAliasesRequest.addAliasAction(removeAction);
        AcknowledgedResponse indicesAliasesResponse = restClient.indices().updateAliases(indicesAliasesRequest,
                RequestOptions.DEFAULT);
        LOGGER.info("change index alias , alias: {}, remove index: {}, add index: {}, response: {}", aliasName,
                oldIndexName, newIndexName, indicesAliasesResponse.isAcknowledged());
        return indicesAliasesResponse.isAcknowledged();
    }

}
